Motion-tracking robotic arm controlled by AI — mirrors your movements in real time and executes natural language commands.
A Raspberry Pi-powered robotic arm that uses your laptop as the AI brain. It can mirror your arm movements via webcam, or respond to natural language commands through AI.
A webcam watches your arm. MediaPipe AI tracks your shoulder, elbow, wrist, and fingers in real time. The robot arm copies your movements at 30fps.
Type or speak commands in plain English — "wave at me", "pick up the bottle", "look to the left". Claude AI translates these into servo movements.
A pan/tilt camera bracket on the arm can auto-track faces or colored objects, keeping the subject centered in frame as it moves.
A built-in web control panel at localhost:5000 lets you manually slide each servo, run preset moves, or send AI commands from any device on your home network.
Your Lenovo laptop acts as the server. The Pi connects over WiFi via WebSocket. The Pi auto-reconnects if the server restarts — no manual reboot needed.
This is phase 1 of a full-body AI robot. The same WebSocket + servo architecture scales to legs, torso, and head by adding more PCA9685 channels.
All parts available on Amazon. Total lands right in the $100–$120 budget range.
Two separate power rails, I2C communication, and 6 servo channels.
| FROM (Pi Pin) | TO (PCA9685) | NOTES | |
|---|---|---|---|
| Pin 1 (3.3V) | → | VCC | Logic power |
| Pin 6 (GND) | → | GND | Ground shared |
| Pin 3 (SDA) | → | SDA | I2C data |
| Pin 5 (SCL) | → | SCL | I2C clock |
| FROM | TO | NOTES | |
|---|---|---|---|
| 6V supply (+) | → | V+ terminal | Servo power only |
| 6V supply (−) | → | GND terminal | Must share with Pi GND |
| CHANNEL | JOINT | NOTES |
|---|---|---|
| CH 0 | Shoulder | Arm kit servo 1 |
| CH 1 | Elbow | Arm kit servo 2 |
| CH 2 | Wrist | Arm kit servo 3 |
| CH 3 | Gripper | Arm kit servo 4 |
| CH 4 | Camera Pan | Pan/tilt bracket |
| CH 5 | Camera Tilt | Pan/tilt bracket |
Follow this order exactly. Don't skip to the Pi until your laptop server is working.
pi and a password you'll rememberSERVER_IP = "192.168.1.XXX" to your laptop's actual IPSERVO_MIN_PULSE and SERVO_MAX_PULSE in arm_controller.pyINTERPOLATION_SPEED — lower for smoother, higher for more responsive5 files — click tabs to switch. Copy button on each file.
# ============================================================ # server.py — Motion tracking + WebSocket server # Run this on your Lenovo laptop FIRST # Install: pip install mediapipe opencv-python websockets numpy # ============================================================ import cv2 import mediapipe as mp import asyncio import websockets import websockets.exceptions import json import numpy as np import logging logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s") log = logging.getLogger("server") mp_pose = mp.solutions.pose mp_hands = mp.solutions.hands mp_drawing = mp.solutions.drawing_utils connected_clients = set() server_running = True CAMERA_INDEX = 0 WEBSOCKET_PORT = 8765 def angle_between(a, b, c): """Calculate joint angle at B from points A-B-C.""" a = np.array([a.x, a.y]) b = np.array([b.x, b.y]) c = np.array([c.x, c.y]) ba = a - b bc = c - b cosine = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6) cosine = np.clip(cosine, -1.0, 1.0) return float(np.degrees(np.arccos(cosine))) def map_range(value, in_min, in_max, out_min, out_max): """Remap a value from one range to another.""" value = np.clip(value, in_min, in_max) percent = (value - in_min) / (in_max - in_min) return int(out_min + percent * (out_max - out_min)) async def broadcast(data: dict): """Send JSON to all connected Pi devices.""" if not connected_clients: return message = json.dumps(data) dead = set() for client in connected_clients: try: await client.send(message) except: dead.add(client) for d in dead: connected_clients.discard(d) async def handle_client(websocket): connected_clients.add(websocket) log.info(f"Pi connected | Total: {len(connected_clients)}") try: async for msg in websocket: log.info(f"Pi status: {msg}") finally: connected_clients.discard(websocket) async def run_pose_tracking(): cap = cv2.VideoCapture(CAMERA_INDEX) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) pose = mp_pose.Pose(min_detection_confidence=0.65, min_tracking_confidence=0.65) hands = mp_hands.Hands(max_num_hands=1, min_detection_confidence=0.7) prev = {"shoulder":90, "elbow":90, "wrist":90, "gripper":90} smooth = 0.4 while server_running: ret, frame = cap.read() if not ret: await asyncio.sleep(0.05) continue frame = cv2.flip(frame, 1) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) rgb.flags.writeable = False pose_res = pose.process(rgb) hands_res = hands.process(rgb) rgb.flags.writeable = True angles = dict(prev) if pose_res.pose_landmarks: lm = pose_res.pose_landmarks.landmark PL = mp_pose.PoseLandmark rs, re, rw, rh, ri = (lm[PL.RIGHT_SHOULDER], lm[PL.RIGHT_ELBOW], lm[PL.RIGHT_WRIST], lm[PL.RIGHT_HIP], lm[PL.RIGHT_INDEX]) if all(p.visibility > 0.5 for p in [rs, re, rw]): angles["shoulder"] = int(smooth * map_range(angle_between(rh,rs,re),30,160,20,160) + (1-smooth) * prev["shoulder"]) angles["elbow"] = int(smooth * map_range(angle_between(rs,re,rw),20,160,20,160) + (1-smooth) * prev["elbow"]) angles["wrist"] = int(smooth * map_range(angle_between(re,rw,ri), 30,150,30,150) + (1-smooth) * prev["wrist"]) mp_drawing.draw_landmarks(frame, pose_res.pose_landmarks, mp_pose.POSE_CONNECTIONS) if hands_res.multi_hand_landmarks: h_lm = hands_res.multi_hand_landmarks[0].landmark tt = h_lm[mp_hands.HandLandmark.THUMB_TIP] it = h_lm[mp_hands.HandLandmark.INDEX_FINGER_TIP] dist = np.sqrt((tt.x-it.x)**2 + (tt.y-it.y)**2) angles["gripper"] = int(smooth * map_range(dist*100,2,20,10,120) + (1-smooth) * prev["gripper"]) prev = dict(angles) await broadcast({"mode":"mirror", **angles}) cv2.imshow("Bionicle Arm Tracking", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break await asyncio.sleep(0.01) cap.release() cv2.destroyAllWindows() async def main(): server = await websockets.serve(handle_client, "0.0.0.0", WEBSOCKET_PORT, ping_interval=20) log.info(f"Server ready on port {WEBSOCKET_PORT}") await asyncio.gather(server.wait_closed(), run_pose_tracking()) if __name__ == "__main__": asyncio.run(main())
# ============================================================ # arm_controller.py — Servo controller for Raspberry Pi # Run AFTER server.py is running on the laptop # Install: pip install websockets adafruit-circuitpython-pca9685 # ============================================================ import asyncio, websockets, websockets.exceptions, json, logging, board, busio from adafruit_pca9685 import PCA9685 from adafruit_motor import servo as adafruit_servo log = logging.getLogger("arm") # ── CONFIG — edit SERVER_IP to your laptop's local IP ────── SERVER_IP = "192.168.1.XXX" # ← CHANGE THIS SERVER_PORT = 8765 SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}" INTERPOLATION_SPEED = 0.18 RECONNECT_DELAY = 3.0 # ── HARDWARE INIT ──────────────────────────────────────────── i2c = busio.I2C(board.SCL, board.SDA) pca = PCA9685(i2c) pca.frequency = 50 def make_servo(ch): return adafruit_servo.Servo(pca.channels[ch], min_pulse=500, max_pulse=2400, actuation_range=180) servo_shoulder = make_servo(0) servo_elbow = make_servo(1) servo_wrist = make_servo(2) servo_gripper = make_servo(3) servo_cam_pan = make_servo(4) servo_cam_tilt = make_servo(5) # ── SMOOTH MOVEMENT STATE ──────────────────────────────────── current = {"shoulder":90.0, "elbow":90.0, "wrist":90.0, "gripper":90.0, "pan":90.0, "tilt":90.0} target = dict(current) def clamp(v, lo=0, hi=180): return max(lo, min(hi, v)) def smooth_step(cur, tgt, spd): diff = tgt - cur return tgt if abs(diff) < 0.5 else cur + diff * spd async def run_smooth_interpolation(): """60Hz loop that moves servos toward their target angles.""" while True: for k in current: current[k] = smooth_step(current[k], target[k], INTERPOLATION_SPEED) servo_shoulder.angle = clamp(current["shoulder"]) servo_elbow.angle = clamp(current["elbow"]) servo_wrist.angle = clamp(current["wrist"]) servo_gripper.angle = clamp(current["gripper"]) servo_cam_pan.angle = clamp(current["pan"]) servo_cam_tilt.angle = clamp(current["tilt"]) await asyncio.sleep(1/60) # ── PRESET MOVEMENTS ───────────────────────────────────────── def go_home(): for k in target: target[k] = 90.0 def execute_wave(): async def _w(): for _ in range(4): target["shoulder"] = 130; await asyncio.sleep(0.4) target["shoulder"] = 60; await asyncio.sleep(0.4) go_home() asyncio.ensure_future(_w()) def execute_pickup(): async def _p(): target["gripper"] = 10; await asyncio.sleep(0.6) target["elbow"] = 35; await asyncio.sleep(0.8) target["gripper"] = 110; await asyncio.sleep(0.5) target["elbow"] = 90; await asyncio.sleep(0.7) asyncio.ensure_future(_p()) COMMAND_MAP = { "wave": execute_wave, "pickup": execute_pickup, "home": go_home, "point": lambda: target.update({"shoulder":90,"elbow":165,"wrist":90}), "reach": lambda: target.update({"shoulder":80,"elbow":150,"gripper":60}), "thumbs_up":lambda: target.update({"wrist":45,"gripper":100}), } def handle_message(data): mode = data.get("mode") if mode == "mirror": for k in ["shoulder","elbow","wrist","gripper"]: if k in data: target[k] = clamp(data[k]) elif mode == "ai_command": cmd = data.get("command","").lower() if cmd in COMMAND_MAP: COMMAND_MAP[cmd]() elif mode == "camera": if "pan" in data: target["pan"] = clamp(data["pan"]) if "tilt" in data: target["tilt"] = clamp(data["tilt"]) async def connect_to_server(): while True: try: async with websockets.connect(SERVER_URL, ping_interval=20) as ws: log.info("Connected to server!") go_home() await ws.send(json.dumps({"status":"ready"})) async for msg in ws: try: handle_message(json.loads(msg)) except: pass except Exception as e: log.warning(f"Disconnected: {e}. Retrying in {RECONNECT_DELAY}s...") await asyncio.sleep(RECONNECT_DELAY) async def main(): await asyncio.gather(run_smooth_interpolation(), connect_to_server()) if __name__ == "__main__": asyncio.run(main())
# ============================================================ # ai_commands.py — Natural language → robot commands via AI # Install: pip install anthropic # ============================================================ import anthropic, json, asyncio, logging, os log = logging.getLogger("ai_commands") client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) SYSTEM_PROMPT = """You control a robotic arm. Convert commands to JSON ONLY. No explanation, no markdown, just raw JSON. Formats: {"mode": "ai_command", "command": "wave"} {"mode": "ai_command", "command": "pickup"} {"mode": "ai_command", "command": "home"} {"mode": "ai_command", "command": "point"} {"mode": "ai_command", "command": "reach"} {"mode": "ai_command", "command": "thumbs_up"} {"mode": "camera", "pan": 0-180, "tilt": 0-180} {"mode": "mirror", "shoulder": 0-180, "elbow": 0-180, "wrist": 0-180, "gripper": 0-180} Camera direction shortcuts: left=pan30, right=pan150, up=tilt50, down=tilt130, center=pan90+tilt90 If unclear, return: {"mode": "ai_command", "command": "home"}""" async def process_command(text: str, broadcast_fn=None) -> dict: """Convert plain English to a robot command JSON and optionally broadcast it.""" log.info(f"AI processing: '{text}'") try: response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=150, system=SYSTEM_PROMPT, messages=[{"role":"user", "content":text}] ) command = json.loads(response.content[0].text.strip()) if broadcast_fn: await broadcast_fn(command) return command except json.JSONDecodeError: fallback = {"mode":"ai_command", "command":"home"} if broadcast_fn: await broadcast_fn(fallback) return fallback except anthropic.AuthenticationError: log.error("Bad API key. Set ANTHROPIC_API_KEY environment variable.") return {} except Exception as e: log.error(f"AI error: {e}") return {} async def run_cli(broadcast_fn=None): """Interactive terminal — type commands in plain English.""" print("\n=== Bionicle Arm AI Interface ===\nType commands like: wave at me | pick up the bottle | look left\nType 'quit' to exit.\n") loop = asyncio.get_event_loop() while True: text = await loop.run_in_executor(None, lambda: input("Command > ").strip()) if text.lower() in ("quit","exit"): break if text: print(f" → {await process_command(text, broadcast_fn)}\n") if __name__ == "__main__": asyncio.run(run_cli())
# ============================================================ # camera_tracker.py — Auto face/object tracking camera # Runs on the Pi — moves pan/tilt to follow targets # ============================================================ import cv2, numpy as np, asyncio, logging, board, busio from adafruit_pca9685 import PCA9685 from adafruit_motor import servo as adafruit_servo CAMERA_INDEX = 0 DEAD_ZONE_X = 0.08 # 8% of frame — prevents center jitter DEAD_ZONE_Y = 0.08 PAN_SPEED = 0.04 TILT_SPEED = 0.04 PAN_MIN, PAN_MAX = 20, 160 TILT_MIN, TILT_MAX = 40, 140 # Change these HSV values to track a different color object COLOR_LOWER = np.array([5, 100, 100]) # Orange (low) COLOR_UPPER = np.array([25, 255, 255]) # Orange (high) i2c = busio.I2C(board.SCL, board.SDA) pca = PCA9685(i2c) pca.frequency = 50 servo_pan = adafruit_servo.Servo(pca.channels[4], min_pulse=500, max_pulse=2400) servo_tilt = adafruit_servo.Servo(pca.channels[5], min_pulse=500, max_pulse=2400) face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml") pan_angle = 90.0 tilt_angle = 90.0 def detect_faces(gray): faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(40,40)) return faces if len(faces) > 0 else [] def detect_color_object(hsv): mask = cv2.inRange(hsv, COLOR_LOWER, COLOR_UPPER) mask = cv2.erode(mask, None, iterations=2) mask = cv2.dilate(mask, None, iterations=2) cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not cnts: return None lg = max(cnts, key=cv2.contourArea) if cv2.contourArea(lg) < 500: return None M = cv2.moments(lg) return (int(M["m10"]/M["m00"]), int(M["m01"]/M["m00"])) def update_camera(tx, ty, w, h): global pan_angle, tilt_angle ex = tx/w - 0.5 ey = ty/h - 0.5 if abs(ex) > DEAD_ZONE_X: pan_angle = np.clip(pan_angle - ex * PAN_SPEED * 180, PAN_MIN, PAN_MAX) if abs(ey) > DEAD_ZONE_Y: tilt_angle = np.clip(tilt_angle + ey * TILT_SPEED * 180, TILT_MIN, TILT_MAX) servo_pan.angle = pan_angle servo_tilt.angle = tilt_angle async def run_tracker(mode="face"): cap = cv2.VideoCapture(CAMERA_INDEX) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) servo_pan.angle = servo_tilt.angle = 90 while True: ret, frame = cap.read() if not ret: await asyncio.sleep(0.1); continue h, w = frame.shape[:2] target = None if mode == "face": gray = cv2.equalizeHist(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) faces = detect_faces(gray) if len(faces) > 0: x,y,fw,fh = max(faces, key=lambda r: r[2]*r[3]) target = (x+fw//2, y+fh//2) cv2.rectangle(frame, (x,y), (x+fw,y+fh), (0,255,0), 2) elif mode == "color": target = detect_color_object(cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)) if target: update_camera(target[0], target[1], w, h) cv2.imshow("Camera Tracker", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break await asyncio.sleep(0.033) cap.release(); cv2.destroyAllWindows() if __name__ == "__main__": asyncio.run(run_tracker(mode="face")) # change to "color" for object tracking
# ============================================================ # dashboard.py — Web control panel (localhost:5000) # Install: pip install flask flask-socketio # ============================================================ from flask import Flask, render_template_string, request, jsonify import asyncio, threading, json, websockets app = Flask(__name__) SERVER_WS_URL = "ws://localhost:8765" async def send_to_arm(data): async with websockets.connect(SERVER_WS_URL) as ws: await ws.send(json.dumps(data)) def send_command(data): asyncio.run(send_to_arm(data)) HTML = """""" @app.route("/") def index(): return render_template_string(HTML) @app.route("/command", methods=["POST"]) def command(): data = request.get_json() threading.Thread(target=send_command, args=(data,), daemon=True).start() return jsonify({"ok":True}) @app.route("/ai", methods=["POST"]) def ai_route(): from ai_commands import process_command text = request.get_json().get("text","") result = asyncio.run(process_command(text)) if result: threading.Thread(target=send_command, args=(result,), daemon=True).start() return jsonify({"command": result}) return jsonify({"error": "AI failed"}), 500 @app.route("/status") def status(): try: from server import connected_clients return jsonify({"clients": len(connected_clients)}) except: return jsonify({"clients": 0}) if __name__ == "__main__": print("Dashboard at http://localhost:5000") app.run(host="0.0.0.0", port=5000)
Your laptop is the brain. The Pi is the muscle. They talk over your home WiFi via WebSocket.
{
"mode": "mirror",
"shoulder": 90,
"elbow": 120,
"wrist": 85,
"gripper": 45
}
{
"mode": "ai_command",
"command": "wave"
}
// commands: wave, pickup,
// home, point, reach,
// thumbs_up, look_around
{
"mode": "camera",
"pan": 90,
"tilt": 60
}
// 0 = full left/up
// 180 = full right/down
Check your SERVER_IP in arm_controller.py matches your laptop's actual IP. Both devices must be on the same WiFi. Run ipconfig on your laptop to verify. Firewall may also be blocking port 8765 — temporarily disable it to test.
Pulse width mismatch. Change SERVO_MIN_PULSE from 500 to 544 and SERVO_MAX_PULSE from 2400 to 2400. Different SG90 batches have slightly different ranges. Try 544–2400 first.
Make sure you're well lit — MediaPipe struggles in dark rooms. Stand 1–2 metres from the webcam. Raise min_detection_confidence if you get false detections. Lower it if it can't find you at all.
I2C not enabled. SSH into Pi and run sudo raspi-config → Interface Options → I2C → Enable. Then reboot. Also double-check your SDA/SCL wires aren't swapped (Pin 3 = SDA, Pin 5 = SCL).
Check ANTHROPIC_API_KEY is set. On Windows run echo %ANTHROPIC_API_KEY% — it should print your key. If it's blank, close and reopen the terminal after setting it with setx.
Pi Zero 2W can thermal throttle under heavy load. camera_tracker.py is CPU intensive — run it at 15fps instead of 30 by changing asyncio.sleep(0.033) to 0.066. A small heatsink on the Pi also helps.